 1.大数据高速计算引擎Spark（上）之Spark Core中RDD编程下的Key-Value RDD操作
   
   RDD整体上分为 Value 类型和 Key-Value 类型。
   前面介绍的是 Value 类型的RDD的操作，实际使用更多的是 key-value 类型的RDD，
也称为 PairRDD。
   Value 类型RDD的操作基本集中在 RDD.scala 中；
object RDD {

  private[spark] val CHECKPOINT_ALL_MARKED_ANCESTORS =
    "spark.checkpoint.checkpointAllMarkedAncestors"
	
   implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): 
PairRDDFunctions[K, V] = {
    new PairRDDFunctions(rdd)
  }
  
}
   key-value 类型的RDD操作集中在 PairRDDFunctions.scala 中；
   前面介绍的大多数算子对 Pair RDD 都是有效的。Pair RDD还有属于自己的
Transformation、Action 算子；
   
   1).创建Pair RDD
val arr = (1 to 10).toArray
val arr1 = arr.map(x => (x, x*10, x*100))
// rdd1 不是 Pair RDD
val rdd1 = sc.makeRDD(arr1)

// rdd2 是 Pair RDD
val arr2 = arr.map(x => (x, (x*10, x*100)))
val rdd2 = sc.makeRDD(arr2)
   2).Transformation操作
   (1).类似 map 操作
   mapValues / flatMapValues / keys / values，这些操作都可以使用 map 操作实
现，是简化操作。
val a = sc.parallelize(List((1,2),(3,4),(5,6)))
// 使用 mapValues 更简洁
val b = a.mapValues(x=>1 to x)
b.collect

// 可使用map实现同样的操作
val b = a.map(x => (x._1, 1 to x._2))
b.collect
val b = a.map{case (k, v) => (k, 1 to v)}
b.collect

// flatMapValues 将 value 的值压平
val c = a.flatMapValues(x=>1 to x)
c.collect

val c = a.mapValues(x=>1 to x).flatMap{case (k, v) => v.map(x => (k, x))}
c.collect

c.keys
c.values

c.map{case (k, v) => k}.collect
c.map{case (k, _) => k}.collect
c.map{case (_, v) => v}.collect
   (2).聚合操作【重要、难点】
   PariRDD(k, v)使用范围广，聚合
   groupByKey / reduceByKey / foldByKey / aggregateByKey
   combineByKey（OLD） / combineByKeyWithClassTag （NEW） => 底层实现
   subtractByKey：类似于subtract，删掉 RDD 中键与 other RDD 中的键相同的元素
   
   小案例：给定一组数据：("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark",
15), ("scala", 26), ("spark", 25), ("spark", 23), ("hadoop", 16), ("scala", 24), 
("spark",16)， 键值对的key表示图书名称，value表示某天图书销量。计算每个键对应的平均
值，也就是计算每种图书的每天平均销量。
val rdd = sc.makeRDD(Array(("spark", 12), ("hadoop", 26),
("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25),
("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16)))

// groupByKey
rdd.groupByKey().map(x=>(x._1,x._2.sum.toDouble/x._2.size)).collect
rdd.groupByKey().map{case (k, v) => (k,v.sum.toDouble/v.size)}.collect
rdd.groupByKey.mapValues(v => v.sum.toDouble/v.size).collect

// reduceByKey
rdd.mapValues((_, 1)).
reduceByKey((x, y)=> (x._1+y._1, x._2+y._2)).
mapValues(x => (x._1.toDouble / x._2)).
collect()

// foldByKey
rdd.mapValues((_, 1)).foldByKey((0, 0))((x, y) => {
(x._1+y._1, x._2+y._2)
}).mapValues(x=>x._1.toDouble/x._2).collect

// aggregateByKey
// aggregateByKey => 定义初值 + 分区内的聚合函数 + 分区间的聚合函数
rdd.mapValues((_, 1)).
aggregateByKey((0,0))(
(x, y) => (x._1 + y._1, x._2 + y._2),
(a, b) => (a._1 + b._1, a._2 + b._2)
).mapValues(x=>x._1.toDouble / x._2).collect

// 初值(元组)与RDD元素类型(Int)可以不一致
rdd.aggregateByKey((0, 0))(
(x, y) => {println(s"x=$x, y=$y"); (x._1 + y, x._2 + 1)},
(a, b) => {println(s"a=$a, b=$b"); (a._1 + b._1, a._2 + b._2)}
).mapValues(x=>x._1.toDouble/x._2).collect

// 分区内的合并与分区间的合并，可以采用不同的方式；这种方式是低效的！
rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int]
())(
(x, y) => {x.append(y); x},
(a, b) => {a++b}
).mapValues(v => v.sum.toDouble/v.size).collect

// combineByKey(理解就行)
rdd.combineByKey(
(x: Int) => {println(s"x=$x"); (x,1)},
(x: (Int, Int), y: Int) => {println(s"x=$x, y=$y");(x._1+y, x._2+1)},
(a: (Int, Int), b: (Int, Int)) => {println(s"a=$a, b=$b");
(a._1+b._1, a._2+b._2)}
).mapValues(x=>x._1.toDouble/x._2).collect

// subtractByKey
val rdd1 = sc.makeRDD(Array(("spark", 12), ("hadoop", 26),("hadoop", 23), ("spark", 15)))
val rdd2 = sc.makeRDD(Array(("spark", 100), ("hadoop", 300)))
rdd1.subtractByKey(rdd2).collect()

// subtractByKey
val rdd = sc.makeRDD(Array(("a",1), ("b",2), ("c",3), ("a",5),("d",5)))
val other = sc.makeRDD(Array(("a",10), ("b",20), ("c",30)))
rdd.subtractByKey(other).collect()
   
   结论：效率相等用最熟悉的方法；groupByKey在一般情况下效率低，尽量少用
   初学：最重要的是实现；如果使用了groupByKey，寻找替换的算子实现；
   
   groupByKey Shuffle过程中传输的数据量大，效率低
   (3).排序操作
   sortByKey：sortByKey函数作用于PairRDD，对Key进行排序。在
   implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]
(rdd: RDD[(K, V)])
    : OrderedRDDFunctions[K, V, (K, V)] = {
    new OrderedRDDFunctions[K, V, (K, V)](rdd)
  }
org.apache.spark.rdd.OrderedRDDFunctions 中实现：
   def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K, V)] = self.withScope
  {
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  }
val a = sc.parallelize(List("wyp", "iteblog", "com", "397090770", "test"))
val b = sc.parallelize (1 to a.count.toInt)
val c = a.zip(b)
c.sortByKey().collect
c.sortByKey(false).collect
   (4).join操作
   cogroup / join / leftOuterJoin / rightOuterJoin / fullOuterJoin
   def join[W](other: RDD[(K, W)], partitioner: Partitioner): 
RDD[(K, (V, W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
    )
  }
val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink")))
val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"), (6,"冯七")))
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect.foreach(println)
rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect
// 仿照源码实现join操作
rdd3.flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield
(v, w)
)

val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")))
val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"),("5","25K"),("6","10K")))
rdd1.join(rdd2).collect
rdd1.leftOuterJoin(rdd2).collect
rdd1.rightOuterJoin(rdd2).collect
rdd1.fullOuterJoin(rdd2).collect
   3).Action操作
   collectAsMap / countByKey / lookup(key)
   countByKey源码：
   def countByKey(): Map[K, Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
  }
   lookup(key)：高效的查找方法，只查找对应分区的数据（如果RDD有分区器的话）
   def lookup(key: K): Seq[V] = self.withScope {
    self.partitioner match {
      case Some(p) =>
        val index = p.getPartition(key)
        val process = (it: Iterator[(K, V)]) => {
          val buf = new ArrayBuffer[V]
          for (pair <- it if pair._1 == key) {
            buf += pair._2
          }
          buf
        } : Seq[V]
        val res = self.context.runJob(self, process, Array(index))
        res(0)
      case None =>
        self.filter(_._1 == key).map(_._2).collect()
    }
  }
val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("1","Java")))
val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"),("5","25K"),("6","10K")))
rdd1.lookup("1")
rdd2.lookup("3")